# coding: utf-8
import time
import traceback
# reload(sys)

# sys.setdefaultencoding('utf-8')
import hashlib
# import ConfigParser
# import OaDataSystem

# sys.setdefaultencoding('utf-8')
# import CrmOperator
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import json
from threading import Thread
from time import sleep
import traceback
# sys.setdefaultencoding('utf-8')
import hashlib
import json
import time
import traceback
from threading import Thread
from time import sleep

# sys.setdefaultencoding('utf-8')
# import CrmOperator
from kafka import KafkaConsumer
from kafka.structs import TopicPartition


# reload(sys)
# import ConfigParser
# import OaDataSystem


class kafkaprocess():

    def msg_process(self, message_set, class_set):
        class_set['kafka_log'].logOperator(1, str(message_set))
        for msg in message_set:
            myjson = json.loads(msg.value.decode('utf-8'))
            if int(myjson['data']['teacher_id']) == 6557:
                print("test0=====", msg.key.decode('utf-8'))


def getmd5(filename):
    fd = open(filename, "r")
    fcont = fd.read()
    fd.close()
    fmd5 = hashlib.md5(fcont)
    return fmd5.hexdigest()


def working(config):
    global conf_md5
    while True:
        sleep(5)
        conf_ing_md5 = getmd5('para.conf')
        if conf_md5 != conf_ing_md5:
            config.read("para.conf")
            conf_md5 = conf_ing_md5


def get_offset_time_window(consumer, partitions_structs, begin_time, end_time):
    begin_search = {}
    for partition in partitions_structs:
        begin_search[partition] = begin_time if isinstance(begin_time, int) else __str_to_timestamp(begin_time)
    begin_offset = consumer.offsets_for_times(begin_search)
    print("test31=====", begin_search)
    print("test32=======", begin_offset)

    end_search = {}
    for partition in partitions_structs:
        end_search[partition] = end_time if isinstance(end_time, int) else __str_to_timestamp(end_time)
    end_offset = consumer.offsets_for_times(end_search)

    for topic_partition, offset_and_timestamp in begin_offset.items():
        b_offset = 'null' if offset_and_timestamp is None else offset_and_timestamp[0]
        e_offset = 'null' if end_offset[topic_partition] is None else end_offset[topic_partition][0]
        print(
            'Between {0} and {1}, {2} offset range = [{3}, {4}]'.format(begin_time, end_time, topic_partition, b_offset,
                                                                        e_offset))
        if b_offset != 'null':
            print("test33========", topic_partition, b_offset)

            consumer.seek(topic_partition, b_offset)
    return consumer, e_offset


def __str_to_timestamp(str_time, format_type='%Y-%m-%d %H:%M:%S'):
    time_array = time.strptime(str_time, format_type)
    return int(time.mktime(time_array)) * 1000


def workline1():
    try:
        begin_time = '2019-08-01 10:54:15'
        end_time = '2019-08-20 19:01:15'
        # consumer = KafkaConsumer(group_id=config.get("db", "main_group_id"),
        #                          bootstrap_servers=config.get("db", "bootstrap_servers"))
        consumer = KafkaConsumer(group_id=config.get("db", "main_group_id"),
                                 sasl_plain_username='xes_oa', sasl_plain_password='CnYN88zKd44tV7ng',
                                 security_protocol='SASL_PLAINTEXT', sasl_mechanism='PLAIN',
                                 bootstrap_servers=config.get("db", "bootstrap_servers")
                                 )

        tps = []
        for p in consumer.partitions_for_topic(str(config.get("db", "main_topic_id"))):
            tp = TopicPartition(str(config.get("db", "main_topic_id")), p)
            tps.append(tp)
        print("test30======", consumer.config['api_version'], tps)

        consumer.assign(tps)
        consumer, end_offset = get_offset_time_window(consumer, tps, begin_time, end_time)

        message_sets = []
        start_time = end_time = int(time.time())
        while True:
            try:
                value_ans = consumer.poll(max_records=20).values()
                if len(value_ans) > 0:
                    for par in value_ans:
                        msg_offset = int(par.offset)
                        msg_partition = int(par.partition)
                        msg_topic = str(par.topic)
                        #if (对应分片的截止时间戳的end_offset到达)：
                        #停止
                        message_sets += par
                        kp.msg_process(message_sets, class_set)
                        del message_sets[:]
            except Exception as e:
                traceback.print_exc()
                print(Exception, ":================", e)

    except Exception as e:
        print(Exception, ":", e)

conf_md5 = getmd5('para.conf')
# read the config file
config = ConfigParser.ConfigParser()
config.read("para.conf")
print('config...')


if __name__ == '__main__':
    print(time.time())

    class_set = {}
    class_set['kafka_log'] = CrmOperator.CrmOperator('kafka_flow_test', 'all-type')

    kp = kafkaprocess()
    # config check work thread
    t1 = Thread(target=working, args=(config,))
    t1.setDaemon(True)
    t1.start()

    # the main binglog thread
    lineT1 = Thread(target=workline1)
    lineT1.setDaemon(True)
    lineT1.start()

    print('main thread running')
    while True:
        sleep(5)
